{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "cd2b0059",
   "metadata": {
    "tags": []
   },
   "source": [
    "# Case Study: Real-Time Credit Card Fraud Detection\n",
    "\n",
    "## Background: real-time feature engineering with DBSP\n",
    "\n",
    "**Feature engineering** is the process of transforming raw data into a set of features that can be used to improve the predictive accuracy of an ML model.  Features are often expressed as SQL queries that filter, transform, aggregate, and join raw data.  These queries are evaluated by an RDBMS, e.g., Postgres, and the results are fed to the ML model during training and inference stages.\n",
    "\n",
    "**Real-time feature engineering** arises in applications where data arrives continuously and requires immediate analysis, such as fraud detection and anomaly detection.  The main challenge in this process is extracting features from constantly changing data.  Although simple cases can be handled by streaming analytics platforms like Flink, they fall short when it comes to complex SQL queries that feature engineers commonly use (we will see examples of such queries in this case study!).  A common workaround is to precompute features through periodic batch jobs in an RDBMS such as BigQuery and inject the precomputed features into the real-time data stream.  This approach allows arbitrary feauture queries but sacrifices **feature freshness**, resulting in poor prediction accuracy in many real-time ML applications since precomputed features do not reflect the latest input data.\n",
    "\n",
    "**DBSP aims to provide the benefits of both worlds** by evaluating complex feature queries directly on streaming data, eliminating the need for batch jobs and delivering perfect feature freshness.\n",
    "\n",
    "## About this case study\n",
    "\n",
    "Our goal in this case study is two-fold:\n",
    "\n",
    "1. To illustrate how ML engineers can invoke DBSP from a Jupyter notebook to evaluate feature extraction queries on streaming data during model training, testing, and inference.\n",
    "1. To empirically prove that DBSP enhances prediction accuracy in real-time ML.  Specifically, we demonstrate that **both complex queries and data freshness are critical for achieving high accuracy in real-time ML applications**.\n",
    "\n",
    "This case study is based on the credit card fraud detection solution published by the Google Cloud blog:\n",
    "https://cloud.google.com/blog/products/data-analytics/how-to-build-a-fraud-detection-solution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 97,
   "id": "bd0696ef",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import tarfile\n",
    "import gdown\n",
    "from os import path\n",
    "import pandas as pd\n",
    "from datetime import datetime\n",
    "import geopy\n",
    "import geopy.distance\n",
    "from sklearn.preprocessing import LabelEncoder\n",
    "from xgboost import XGBClassifier\n",
    "from sklearn.model_selection import train_test_split\n",
    "from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, precision_recall_curve, roc_curve\n",
    "from collections import Counter\n",
    "from sklearn.utils import shuffle\n",
    "import warnings\n",
    "import urllib.request\n",
    "import json\n",
    "\n",
    "warnings.filterwarnings('ignore')\n",
    "\n",
    "train_url = \"https://raw.githubusercontent.com/feldera/ci_datasets/0eed3caac45415f6cd531dc5cb2e7f531d09a823/train_ci.csv\"\n",
    "test_url = \"https://raw.githubusercontent.com/feldera/ci_datasets/0eed3caac45415f6cd531dc5cb2e7f531d09a823/test_ci.csv\"\n",
    "simulation_url = \"https://raw.githubusercontent.com/feldera/ci_datasets/0eed3caac45415f6cd531dc5cb2e7f531d09a823/simulation_ci.csv\"\n",
    "demographics_url = \"https://github.com/feldera/ci_datasets/raw/e06f92fbd74f11d96ceb8b19c7692d05c7aca6d0/demographics.csv\"\n",
    "\n",
    "train_outpath        = path.abspath(\"fraud_data/train_output.csv\")\n",
    "test_outpath         = path.abspath(\"fraud_data/test_output.csv\")\n",
    "simulation_outpath   = path.abspath(\"fraud_data/simulation_output.csv\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "37021d75-be0b-44ab-acc3-887c98af9dba",
   "metadata": {},
   "source": [
    "## Prepare a feature extraction query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 98,
   "id": "89152a2c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Compiling program demo-fraud-detection-notebook-program (version: 1)...\n",
      "Program status: Pending\n",
      "Program status: CompilingRust\n",
      "Program status: CompilingRust\n",
      "Program status: CompilingRust\n",
      "Program status: CompilingRust\n",
      "Program status: CompilingRust\n",
      "Program status: Success\n",
      "done\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "import time\n",
    "import requests\n",
    "\n",
    "# API URL\n",
    "api_url = \"http://localhost:8080\"\n",
    "\n",
    "# Program\n",
    "program_name = \"demo-fraud-detection-notebook-program\"\n",
    "program_sql = \"\"\"\n",
    "CREATE TABLE demographics (\n",
    "    cc_num FLOAT64 NOT NULL,\n",
    "    first STRING,\n",
    "    gender STRING,\n",
    "    street STRING,\n",
    "    city STRING,\n",
    "    state STRING,\n",
    "    zip INTEGER,\n",
    "    lat FLOAT64 NOT NULL,\n",
    "    long FLOAT64 NOT NULL,\n",
    "    city_pop INTEGER,\n",
    "    job STRING,\n",
    "    dob STRING\n",
    "    --dob DATE\n",
    ");\n",
    "\n",
    "CREATE TABLE transactions (\n",
    "    trans_date_trans_time TIMESTAMP NOT NULL,\n",
    "    cc_num FLOAT64 NOT NULL,\n",
    "    merchant STRING,\n",
    "    category STRING,\n",
    "    amt FLOAT64,\n",
    "    trans_num STRING,\n",
    "    unix_time INTEGER NOT NULL,\n",
    "    merch_lat FLOAT64 NOT NULL,\n",
    "    merch_long FLOAT64 NOT NULL,\n",
    "    is_fraud INTEGER\n",
    ");\n",
    "\n",
    "CREATE VIEW features as\n",
    "    SELECT\n",
    "        DAYOFWEEK(trans_date_trans_time) AS d,\n",
    "        -- TIMESTAMPDIFF(YEAR, trans_date_trans_time, CAST(dob as TIMESTAMP)) AS age,\n",
    "        \"ST_DISTANCE\"(\"ST_POINT\"(long,lat), \"ST_POINT\"(merch_long,merch_lat)) AS distance,\n",
    "        -- TIMESTAMPDIFF(MINUTE, trans_date_trans_time, last_txn_date) AS trans_diff,\n",
    "        AVG(amt) OVER(\n",
    "            PARTITION BY   CAST(cc_num AS NUMERIC)\n",
    "            ORDER BY unix_time\n",
    "            -- 1 week is 604800  seconds\n",
    "            RANGE BETWEEN 604800  PRECEDING AND 1 PRECEDING) AS\n",
    "        avg_spend_pw,\n",
    "        AVG(amt) OVER(\n",
    "            PARTITION BY  CAST(cc_num AS NUMERIC)\n",
    "            ORDER BY unix_time\n",
    "            -- 1 month(30 days) is 2592000 seconds\n",
    "            RANGE BETWEEN 2592000 PRECEDING AND 1 PRECEDING) AS\n",
    "        avg_spend_pm,\n",
    "        COUNT(*) OVER(\n",
    "            PARTITION BY  CAST(cc_num AS NUMERIC)\n",
    "            ORDER BY unix_time\n",
    "            -- 1 day is 86400  seconds\n",
    "            RANGE BETWEEN 86400  PRECEDING AND 1 PRECEDING ) AS\n",
    "        trans_freq_24,\n",
    "        category,\n",
    "        amt,\n",
    "        state,\n",
    "        job,\n",
    "        unix_time,\n",
    "        city_pop,\n",
    "        merchant,\n",
    "        is_fraud\n",
    "    FROM (\n",
    "        SELECT t1.*, t2.*\n",
    "               -- , LAG(trans_date_trans_time, 1) OVER (PARTITION BY t1.cc_num  ORDER BY trans_date_trans_time ASC) AS last_txn_date\n",
    "        FROM  transactions AS t1\n",
    "        JOIN  demographics AS t2\n",
    "        ON t1.cc_num = t2.cc_num);\"\"\"\n",
    "\n",
    "# Create program\n",
    "response = requests.put(f\"{api_url}/v0/programs/{program_name}\", json={\n",
    "    \"description\": \"\",\n",
    "    \"code\": program_sql\n",
    "})\n",
    "response.raise_for_status()\n",
    "program_version = response.json()[\"version\"]\n",
    "\n",
    "# Compile program\n",
    "print(f\"Compiling program {program_name} (version: {program_version})...\")\n",
    "requests.post(f\"{api_url}/v0/programs/{program_name}/compile\", json={\"version\": program_version}).raise_for_status()\n",
    "while True:\n",
    "    status = requests.get(f\"{api_url}/v0/programs/{program_name}\").json()[\"status\"]\n",
    "    print(f\"Program status: {status}\")\n",
    "    if status == \"Success\":\n",
    "        break\n",
    "    elif status != \"Pending\" and status != \"CompilingRust\" and status != \"CompilingSql\":\n",
    "        raise RuntimeError(f\"Failed program compilation with status {status}\")\n",
    "    time.sleep(5)\n",
    "print(\"done\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 99,
   "id": "7886a22c",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import threading\n",
    "\n",
    "def wait_for_pipeline(pipeline_name: str):\n",
    "    # Wait till the pipeline is completed\n",
    "    while not requests.get(f\"{api_url}/v0/pipelines/{pipeline_name}/stats\") \\\n",
    "            .json()[\"global_metrics\"][\"pipeline_complete\"]:\n",
    "        time.sleep(1)\n",
    "    \n",
    "    requests.post(f\"{api_url}/v0/pipelines/{pipeline_name}/shutdown\").raise_for_status()\n",
    "\n",
    "\n",
    "def run_query(transaction_url: str, output_file: str):\n",
    "\n",
    "    # Connectors\n",
    "    connectors = []\n",
    "    for (connector_name, stream, url, is_input) in [\n",
    "        (\"demographics\", 'DEMOGRAPHICS', demographics_url, True),\n",
    "        (\"transactions\", 'TRANSACTIONS', transaction_url, True),\n",
    "    ]:\n",
    "        requests.put(f\"{api_url}/v0/connectors/{connector_name}\", json={\n",
    "            \"description\": \"\",\n",
    "            \"config\": {\n",
    "               \"format\": { \"name\": \"csv\", \"config\": {} },\n",
    "               \"transport\": { \"name\": \"url_input\", \"config\": { \"path\": url, } }\n",
    "            }\n",
    "        })\n",
    "        connectors.append({ \"connector_name\": connector_name, \"is_input\": is_input, \"name\": connector_name, \"relation_name\": stream })\n",
    "    \n",
    "    # Create pipeline\n",
    "    pipeline_name = \"demo-fraud-detection-notebook-pipeline\"\n",
    "    requests.put(f\"{api_url}/v0/pipelines/{pipeline_name}\", json={\n",
    "        \"description\": \"\",\n",
    "        \"config\": {\"workers\": 6},\n",
    "        \"program_name\": program_name,\n",
    "        \"connectors\": connectors,\n",
    "    }).raise_for_status()\n",
    "    \n",
    "    # Start pipeline in paused state, so we don't miss any outputs\n",
    "    print(\"(Re)starting pipeline...\")\n",
    "    requests.post(f\"{api_url}/v0/pipelines/{pipeline_name}/shutdown\").raise_for_status()\n",
    "    while requests.get(f\"{api_url}/v0/pipelines/{pipeline_name}\").json()[\"state\"][\"current_status\"] != \"Shutdown\":\n",
    "        time.sleep(1)\n",
    "    requests.post(f\"{api_url}/v0/pipelines/{pipeline_name}/pause\").raise_for_status()\n",
    "    while requests.get(f\"{api_url}/v0/pipelines/{pipeline_name}\").json()[\"state\"][\"current_status\"] != \"Paused\":\n",
    "        time.sleep(1)\n",
    "        \n",
    "    print(\"Pipeline (re)started\")\n",
    "\n",
    "    # Start listening for the output stream\n",
    "    features = requests.post(f\"{api_url}/v0/pipelines/{pipeline_name}/egress/features?format=csv\", stream = True)   \n",
    "\n",
    "    # Run the pipeline\n",
    "    requests.post(f\"{api_url}/v0/pipelines/{pipeline_name}/start\").raise_for_status()\n",
    "    while requests.get(f\"{api_url}/v0/pipelines/{pipeline_name}\").json()[\"state\"][\"current_status\"] != \"Running\":\n",
    "        time.sleep(1)\n",
    "    \n",
    "    # Wait for the pipeline to complete. The thread will shutdown the pipeline when it's done,\n",
    "    # allowing the iteration below to terminate.\n",
    "    thread = threading.Thread(target=wait_for_pipeline(pipeline_name))\n",
    "    thread.start()\n",
    "        \n",
    "    print(f\"writing features to {output_file}\")\n",
    "    with open(output_file, \"w\") as feature_file:\n",
    "        for line in features.iter_lines():\n",
    "            if line:\n",
    "                data = json.loads(line).get(\"text_data\")\n",
    "                if data is not None:\n",
    "                    feature_file.write(data + '\\n')"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3ceee2c1-90af-4823-8521-733287328119",
   "metadata": {},
   "source": [
    "## Compute features on training and testing datasets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 94,
   "id": "2cbfdd56",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(Re)starting pipeline...\n",
      "Pipeline (re)started\n",
      "writing data to /home/leonid/projects/feldera-test/demo/demo_notebooks/fraud_data/train_output.csv\n",
      "Training pipeline finished\n",
      "(Re)starting pipeline...\n",
      "Pipeline (re)started\n",
      "writing data to /home/leonid/projects/feldera-test/demo/demo_notebooks/fraud_data/test_output.csv\n",
      "Test pipeline finished\n"
     ]
    }
   ],
   "source": [
    "run_query(train_url, train_outpath)\n",
    "print(\"Training pipeline finished\")\n",
    "run_query(test_url, test_outpath)\n",
    "print(\"Test pipeline finished\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ee26c169",
   "metadata": {},
   "source": [
    "## Train XGBoost model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 86,
   "id": "74331f53",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Precision =     1.000\n",
      "Recall (TPR) =  1.000\n",
      "F1 = 1.000\n",
      "Precision =     0.984\n",
      "Recall (TPR) =  0.904\n",
      "F1 = 0.943\n",
      "XGBoost Accuracy: 99.83%\n"
     ]
    }
   ],
   "source": [
    "def show_data(cm, print_res = 0):\n",
    "    tp = cm[1,1]\n",
    "    fn = cm[1,0]\n",
    "    fp = cm[0,1]\n",
    "    tn = cm[0,0]\n",
    "    if print_res == 1:\n",
    "        pr = tp/(tp+fp)\n",
    "        rec=  tp/(tp+fn)\n",
    "        print('Precision =     {:.3f}'.format(pr))\n",
    "        print('Recall (TPR) =  {:.3f}'.format(rec))\n",
    "        #print('Fallout (FPR) = {:.3f}'.format(fp/(fp+tn)))\n",
    "        print('F1 = {:.3f}'.format(2*(pr*rec)/(pr+rec)))\n",
    "    return tp/(tp+fp), tp/(tp+fn), fp/(fp+tn)\n",
    "\n",
    "max_depth = 12\n",
    "n_estimators = 100\n",
    "\n",
    "traindata     = pd.read_csv(train_outpath, float_precision='round_trip')  \n",
    "train_dataset = shuffle(traindata)\n",
    "\n",
    "test_dataset     = pd.read_csv(test_outpath, float_precision='round_trip')  \n",
    "\n",
    "nb_cols = len(train_dataset.columns.tolist())\n",
    "    \n",
    "X_train = train_dataset.iloc[:, 0:nb_cols - 2].values\n",
    "y_train = train_dataset.iloc[:, nb_cols-2].values.astype(int)        \n",
    "\n",
    "X_test = test_dataset.iloc[:, 0:nb_cols - 2].values\n",
    "y_test = test_dataset.iloc[:, nb_cols-2].values.astype(int)    \n",
    "\n",
    "\n",
    "model = XGBClassifier(max_depth = max_depth,  n_estimators = n_estimators, objective = 'binary:logistic')#, scale_pos_weight= estimate) \n",
    "setattr(model, 'verbosity', 0)\n",
    "model.fit(X_train, y_train)\n",
    "\n",
    "# evaluate train data\n",
    "y_pred = model.predict(X_train)\n",
    "predictions = [round(value) for value in y_pred]\n",
    "cm = confusion_matrix(y_train, predictions)\n",
    "show_data(cm, print_res = 1)\n",
    "\n",
    "# evaluate for test data\n",
    "y_pred = model.predict(X_test)\n",
    "predictions = [round(value) for value in y_pred]\n",
    "cm = confusion_matrix(y_test, predictions)\n",
    "show_data(cm, print_res = 1)\n",
    "\n",
    "accuracy = accuracy_score(y_test, predictions)\n",
    "print(\"XGBoost Accuracy: %.2f%%\" % (accuracy * 100.0))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ea90ef39",
   "metadata": {},
   "source": [
    "## Inference"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 87,
   "id": "692c286a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(Re)starting pipeline...\n",
      "Pipeline (re)started\n"
     ]
    }
   ],
   "source": [
    "run_query(simulation_url, simulation_outpath)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 88,
   "id": "5f993f7e",
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "reading data from /home/leonid/projects/feldera-test/demo/demo_notebooks/fraud_data/simulation_output.csv\n",
      "Precision =     0.964\n",
      "Recall (TPR) =  0.973\n",
      "F1 = 0.969\n",
      "XGBoost Accuracy: 99.93%\n",
      "Precision =     1.000\n",
      "Recall (TPR) =  0.861\n",
      "F1 = 0.925\n",
      "XGBoost Accuracy: 99.71%\n",
      "Precision =     0.993\n",
      "Recall (TPR) =  0.854\n",
      "F1 = 0.918\n",
      "XGBoost Accuracy: 99.77%\n",
      "Precision =     0.977\n",
      "Recall (TPR) =  0.909\n",
      "F1 = 0.942\n",
      "XGBoost Accuracy: 99.84%\n",
      "Precision =     0.968\n",
      "Recall (TPR) =  0.968\n",
      "F1 = 0.968\n",
      "XGBoost Accuracy: 99.93%\n"
     ]
    }
   ],
   "source": [
    "try:\n",
    "    chunksize = 1024*10\n",
    "    print(f\"reading data from {simulation_outpath}\")\n",
    "    simulation = pd.read_csv(simulation_outpath, iterator=True, chunksize=chunksize)\n",
    "    for simulation_batch in simulation:\n",
    "        nb_cols = len(simulation_batch.columns.tolist())        \n",
    "        X_simulation = simulation_batch.iloc[:, 0:nb_cols - 2].values\n",
    "        y_simulation = simulation_batch.iloc[:, nb_cols-2].values.astype(int)     \n",
    "        \n",
    "        y_pred = model.predict(X_simulation)    \n",
    "        predictions = [round(value) for value in y_pred]\n",
    "        cm = confusion_matrix(y_simulation, predictions)\n",
    "        show_data(cm, print_res = 1)    \n",
    "        accuracy = accuracy_score(y_simulation, predictions)\n",
    "        print(\"XGBoost Accuracy: %.2f%%\" % (accuracy * 100.0))\n",
    "     \n",
    "except Exception as err:\n",
    "    print(f\"Error: cannot read from the specified source {err}\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
